package com.xiaofan.hotitems_analysis

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import java.util.Properties

/**
 * @author: yangShen
 * @Description: 测试发送批量数据时使用：把UserBehavior.csv的中的数据读取出来，发送到kafka中(提供者)
 * @Date: 2020/4/29 11:11
 */
object KafkaProducer {
  def main(args: Array[String]): Unit = {
    writeToKafka("hot_items")
  }

  def writeToKafka(topic: String): Unit = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091")
    properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    //定义一个kafka producer
    val producer = new KafkaProducer[String, String](properties)

    //从文件中读取数据并发送
    val bufferedSource = io.Source.fromFile("D:\\big-data\\code\\UserBehaviorAnalysis\\HotItemsAnalysis\\src\\main\\resources\\UserBehavior.csv")
    for (line <- bufferedSource.getLines()) {
      val record = new ProducerRecord[String, String](topic, line)
      producer.send(record)
    }

    producer.close()
  }
}
